package com.comtom.soft.thrift.client;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.curator.framework.CuratorFramework;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;

/**
 * 异步客户端代理
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public class AsynThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {
	
	private Logger logger = LoggerFactory.getLogger(getClass());

	//连接池配置
	private ThriftClienPoolConfig thriftClienPoolConfig;

	//最多活跃的连接数
	private Integer maxActive = 32;
	
	//链接等待时间,超时则异常
	private Integer maxWait=180000;
	
	//最大的空闲链接
	private Integer maxIdle=5;
	
	//最小的空闲链接
	private Integer minIdle=0;
	
	private CuratorFramework zkClient;
	
	private String service;
	
	private ThriftServerProvider serverAddressProvider;

	private Object proxyClient;
	private Class<?> objectClass;

	private GenericObjectPool<AsyncClientAdapter> pool;

	private AyncThriftClientPoolFactory.PoolOperationCallBack callback = new AyncThriftClientPoolFactory.PoolOperationCallBack() {
		@Override
		public void make(AsyncClientAdapter client) {
			logger.info("create");
		}

		@Override
		public void destroy(AsyncClientAdapter client) {
			logger.info("destroy");
		}
	};
	
	public void setMaxActive(Integer maxActive) {
		this.maxActive = maxActive;
	}


	public void setServerAddressProvider(ThriftServerProvider serverAddressProvider) {
		this.serverAddressProvider = serverAddressProvider;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		if(serverAddressProvider==null){
			ThriftServerProviderZookeeper thriftServerProviderZookeeper=new ThriftServerProviderZookeeper();
			thriftServerProviderZookeeper.setService(service);
			thriftServerProviderZookeeper.setZkClient(zkClient);
			thriftServerProviderZookeeper.afterPropertiesSet();
			serverAddressProvider=thriftServerProviderZookeeper;
		}
		
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
		// 加载Iface接口
		objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$AsyncIface");
		// 加载Client.Factory类
		Class<TAsyncClientFactory<TAsyncClient>> fi = (Class<TAsyncClientFactory<TAsyncClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$AsyncClient$Factory");		
		AyncThriftClientPoolFactory clientPool = new AyncThriftClientPoolFactory(serverAddressProvider, fi, callback);
		GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
		if(thriftClienPoolConfig!=null){
			poolConfig.maxActive = thriftClienPoolConfig.getMaxActive();
			poolConfig.maxWait=thriftClienPoolConfig.getMaxWait();
			poolConfig.maxIdle = thriftClienPoolConfig.getMaxIdle();
			poolConfig.minIdle = thriftClienPoolConfig.getMinIdle();
		}else{
			poolConfig.maxActive = maxActive;
			poolConfig.maxWait=maxWait;
			poolConfig.maxIdle = maxIdle;
			poolConfig.minIdle = minIdle;
		}
		poolConfig.testOnBorrow=true;
		poolConfig.testOnReturn=false;
		poolConfig.testWhileIdle=false;
		pool = new GenericObjectPool<AsyncClientAdapter>(clientPool, poolConfig);
		proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				//获取适配器对象
				AsyncClientAdapter adapter = pool.borrowObject();
				//获取异步对象
				TAsyncClient client=adapter.getAsyncClient();	
				//判断异步对象正在调用方法.正在调用则等待
				while(adapter.isCallIsRuning()){
					Thread.sleep(100);
				}
				//设置正在调用
				adapter.setCallIsRuning(true);
				if(args[args.length-1] instanceof AsyncMethodCallback){
					AsyncMethodCallback callback=(AsyncMethodCallback) args[args.length-1];
					AbStractCallBack back=new AbStractCallBack(callback,adapter);
					args[args.length-1]=back;
				}
				boolean success = true;
				try {
					return method.invoke(client, args);
				} catch (Exception e) {
					success = false;
					throw e;
				} finally {
					if(success){
						pool.returnObject(adapter);
					}else{
						pool.invalidateObject(adapter);
					}
				}
			}
		});
	}

	@Override
	public Object getObject() throws Exception {
		return proxyClient;
	}

	@Override
	public Class<?> getObjectType() {
		return objectClass;
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

	/**
	 * @param zkClient the zkClient to set
	 */
	public void setZkClient(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	/**
	 * @param service the service to set
	 */
	public void setService(String service) {
		this.service = service;
	}

	
	/**
	 * @param thriftClienPoolConfig the thriftClienPoolConfig to set
	 */
	public void setThriftClienPoolConfig(ThriftClienPoolConfig thriftClienPoolConfig) {
		this.thriftClienPoolConfig = thriftClienPoolConfig;
	}

	public void close() {
		if (serverAddressProvider != null) {
			serverAddressProvider.close();
		}
	}
}
